Iris options (dask parallel processing)#2462
Conversation
lib/iris/__init__.py
Outdated
| from iris._deprecation import IrisDeprecation, warn_deprecated | ||
| import iris.fileformats | ||
| import iris.io | ||
| import iris.options |
There was a problem hiding this comment.
iris.options is not being used in this module, so by importing it here, you are catering to someone being able to do:
import iris
iris.options.parallel(scheduler='multiprocessing', num_workers=6)
I don't think this is a good thing. I would prefer users have to import iris.options directly
There was a problem hiding this comment.
I disagree; I think this is a good thing. We want to make options as easy to use as possible. It makes complete sense that the controlling options for Iris come as part of the primary import.
And it's not just me that thinks so -- dask does this, as does iris.FUTURE, by dint of being defined in the Iris __init__ already. And adding options to the __init__ actually was a design consideration I made, which I only dropped because of the potential size of the options module.
There was a problem hiding this comment.
@dkillick It's a moot point at the moment since iris.options doesn't actually exist 😜
lib/iris/options.py
Outdated
| using multiprocessing with six worker processes:: | ||
|
|
||
| >>> iris.options.parallel(scheduler='multiprocessing', num_workers=6) | ||
| >>> iris.load('my_dataset.nc') |
There was a problem hiding this comment.
To be consistent with the other code examples, you should remove the python interpreter prompts i.e. the >>> and the ...
There was a problem hiding this comment.
👍 And indent by four spaces? I didn't look very hard into how this is done elsewhere...
There was a problem hiding this comment.
To be consistent with the other code examples
Actually, can you provide an example of this please? All the examples I just found in cube.py use the chevrons...
There was a problem hiding this comment.
I may be wrong. I didn't realise it would be so difficult for you to find examples
There was a problem hiding this comment.
There's no need to get snippy. All I was after was the example you found, as they were different to the examples I found...
|
Discussed betwen @dkillick @pp-mo in offline conversation... |
|
Mocky tests added with some much appreciated help from @bjlittle! Think this is ready to go now, so long as Travis is happy... |
lib/iris/_lazy_data.py
Outdated
|
|
||
| .. note:: | ||
| Specific dask options for computation are controlled by | ||
| :class:`iris.options.Parallel`. |
There was a problem hiding this comment.
@dkillick This is stale, should be iris.config.Parallel or should it be iris.config.parallel ?
There was a problem hiding this comment.
@bjlittle it should be :class:iris.config.Parallel as Parallel is (a) actually a class that can be linked to by Sphinx, and (b) has a docstring. I'll change this, and all of the similar below...
lib/iris/config.py
Outdated
| * Specify that we want to load a cube with dask parallel processing | ||
| using multiprocessing with six worker processes:: | ||
|
|
||
| iris.options.parallel(scheduler='multiprocessing', num_workers=6) |
lib/iris/config.py
Outdated
| * Specify, with a context manager, that we want to load a cube with | ||
| dask parallel processing using four worker threads:: | ||
|
|
||
| with iris.options.parallel(scheduler='threaded', num_workers=4): |
There was a problem hiding this comment.
@dkillick iris.config.parallel
lib/iris/config.py
Outdated
| * Run dask parallel processing using a distributed scheduler that has | ||
| been set up at the IP address and port at ``192.168.0.219:8786``:: | ||
|
|
||
| iris.options.parallel(scheduler='192.168.0.219:8786') |
There was a problem hiding this comment.
I thought I'd fixed all of them 😒
| # NOTE: tests that call the option class directly and as a contextmgr. | ||
|
|
||
| def test_bad_name__contextmgr(self): | ||
| # Check we can't do `with iris.options.parallel.context('foo'='bar')`. |
lib/iris/config.py
Outdated
| try: | ||
| import distributed | ||
| except ImportError: | ||
| DISTRIBUTED_AVAILABLE = False |
There was a problem hiding this comment.
@dkillick Would you consider this slightly neater pattern (IMHO):
try:
import distributed
except ImportError:
distributed = NoneThen later in the code, rather than checking for
if DISTRIBUTED_AVAILABLE:it's simply
if distributed is not None:or
if distributed:See here ...
lib/iris/config.py
Outdated
| import contextlib | ||
| from multiprocessing import cpu_count | ||
| from multiprocessing.pool import ThreadPool | ||
| import re |
There was a problem hiding this comment.
@dkillick Import order, move below following line ...
lib/iris/config.py
Outdated
| if scheduler in ['threaded', 'multiprocessing']: | ||
| num_workers = self.get('num_workers') | ||
| pool = ThreadPool(num_workers) | ||
| if scheduler == 'distributed': |
| default = self._defaults_dict['scheduler']['default'] | ||
| if value is None: | ||
| value = default | ||
| elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): |
There was a problem hiding this comment.
@dkillick You could compile this re ...
corinnebosley
left a comment
There was a problem hiding this comment.
Nice proposal Pete, I think that a lot of people will appreciate the ability to manipulate these options.
| https://distributed.readthedocs.io/en/latest/index.html. | ||
|
|
||
| Example usages: | ||
|
|
There was a problem hiding this comment.
@dkillick I'm really glad you put some example usages in here. And I like the amount of detail you have put into the descriptions of the args.
| if value >= cpu_count(): | ||
| # Limit maximum CPUs used to 1 fewer than all available CPUs. | ||
| wmsg = ('Requested more CPUs ({}) than total available ({}). ' | ||
| 'Limiting number of used CPUs to {}.') |
There was a problem hiding this comment.
@dkillick Is this going to be a bit confusing? What if somebody requests that maximum number of CPUs available, and then is only allowed to use one less than the max available, but the warning message states that the total available is the same number that they requested.
For example, let's say my CPU count is 8, and as a parallelisation beginner I want to see how fast my script is when I run it on all 8 CPUs, so I request 8. This isn't allowed by set_num_workers, so I get a warning message which says:
'Requested more CPUs (8) than total available (8). Limiting number of used CPUs to 7'
I would find this message confusing, and it would make me angry. Maybe it just needs a little clarification that you can't use all the CPUs available because it's silly.
|
|
||
| pool = self.pool | ||
| get = dask.async.get_sync | ||
| self.patch_set_options.assert_called_once_with(pool=None, get=get) |
There was a problem hiding this comment.
@dkillick I'm confused, if pool=None in the assert on L.137, why do you need 'pool = self.pool' on L.135?
There was a problem hiding this comment.
Because we want to make sure it isn't used in this case! Nice spot though 👍
|
I'm not sure I like the idea of overriding |
| def set__scheduler(self, value): | ||
| return value | ||
|
|
||
| def set_scheduler(self, value): |
There was a problem hiding this comment.
These set_<key> methods are confusingly named. They don't actually set anything. A better name would be convert_<key>, IMO.
There was a problem hiding this comment.
They define values that are set for keys in the instance's underlying __dict__. In this sense they behave very similarly to the @property.setter in a more traditional class.
I think renaming them to convert_xxx would be more confusing as I think "converting" is further from what they're doing than "setting" is.
There was a problem hiding this comment.
they behave very similarly to the @property.setter in a more traditional class
But they don't behave like a setter because they don't set anything!
| # Distributed not available. | ||
| wmsg = 'Cannot import distributed. Defaulting to {}.' | ||
| warnings.warn(wmsg.format(default)) | ||
| self.set_scheduler(default) |
There was a problem hiding this comment.
This doesn't actually do anything, assuming the default scheduler is always the same. Did you mean to return this?
There was a problem hiding this comment.
My intention is that the scheduler setter is re-run on the default value in case it throws up any problems with setting the scheduler to the default in this case. Either way, the default value will very likely be returned in the next call of set_scheduler.
There was a problem hiding this comment.
But what is returned from this function is value, and that isn't changed if this else section is reached. So ultimately what will be set in the attribute is the original value passed in, despite what the warning says.
There was a problem hiding this comment.
Actually, I see what happens. When Option.__setattr__ is called, it doesn't recognise the address as a legitimate value, and sets the attribute to the default. But I still don't understand the point of this self.set_scheduler(default) call.
| value = default | ||
| elif re.match(r'^(\d{1,3}\.){3}\d{1,3}:\d{1,5}$', value): | ||
| if DISTRIBUTED_AVAILABLE: | ||
| value = 'distributed' |
There was a problem hiding this comment.
If someone tries to set the scheduler to a new address, aren't you just discarding it here?
There was a problem hiding this comment.
Right, so it's only set in __init__. If I do iris.options.parallel.scheduler = <address>, that won't change what's saved in _scheduler.
lib/iris/config.py
Outdated
| def __setattr__(self, name, value): | ||
| if value is None: | ||
| value = self._defaults_dict[name]['default'] | ||
| attr_setter = getattr(self, 'set_{}'.format(name)) |
There was a problem hiding this comment.
This is pretty ugly, IMO. I like the way Matplotlib handles this. In addition to storing default values, they also store a validator/converter function for each parameter (See matplotlib/rcsetup.py)
|
Closed on account of excessive complexity and replaced by #2511. |
Introduces a new module;
iris.options. Within this module we can provide classes that control certain elements of Iris' behaviour within a limited scope, such as the lifetime of the session, or within a context manager.Specifically, this PR introduces the
Parallelclass, used to control dask parallel processing options for Iris. TheParallelclass allows for control of:computeis called (defaults tothreaded), andthreadedandmultiprocessingscheduler options).Note: no tests yet (I'll do them next); I was keen to get eyes on the functional code I've proposed here.
The pattern proposed here also has an obvious application to the proposal in #2457.
Addresses #2403.